package flink.eureka.window

import org.apache.flink.api.common.functions.{AggregateFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

/**
  *
  * @author com.eureka.wh   
  * @since 2019/6/25 23:36
  */
object TestAggFunctionOnWindow {

  val t3 = Array[(String, String, Long)](
    ("class1", "张三", 100L),
    ("class1", "李四", 78L),
    ("class1", "王五", 99L),
    ("class1", "王五", 99L),
    ("class2", "赵六", 81L),
    ("class2", "钱七", 59L),
    ("class2", "马二", 97L),
    ("class2", "马二", 97L)
  )

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input = env.fromCollection(t3)
    val avgScore = input.keyBy(0).countWindow(2).aggregate(new MyAggregate)
    avgScore.print()
    env.execute()
  }
}

class MyAggregate extends AggregateFunction[(String, String, Long), (Long, Long), Double] {

  override def createAccumulator(): (Long, Long) = (0L, 0L)

  override def add(value: (String, String, Long), accumulator: (Long, Long)): (Long, Long) = {
    (accumulator._1 + value._3, accumulator._2 + 1L)
  }

  override def getResult(accumulator: (Long, Long)): Double = (accumulator._1 / accumulator._2)

  override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2)
}
